# Code updated: 2021 April 27

#### PREAMBLE ####

## Description:
# Uses multiple "database pulls" from WaterSmart to obtain as complete and lengthy panels as possible of:
# (1) WaterSmart reports sent, combined with treatment arm assignment "section" - keeping earliest version of each record only
# (2) Billed water "usage" gallons per day (gpd) by residence and billing period - keeping newest version of each record only
# (3) Residential participation in various water utility programs - keeping all records compiled across data pulls
# (4) Leak alerts sent by WaterSmart - keeping earliest version of each record only
# (5) Mostly-fixed residential characteristics such as lot size, bedrooms, etc. - keeping all records compiled across data pulls
# (6) Hourly consumption. Burbank: 2014-01-13 through 2016-10-05; Glendale: 2014-06-23 through 2010-10-26 - newest version only
# (7) Daily water consumption and/or GPD generated using billed usage files for the pre-hourly data period - newest version only

## Notes:
# 1. The Burbank violation warnings and associated enforcement started on around 2015-07-01.
# 2. The initial WaterSmart reports were sent in Glendale starting on 2014-08-26; sent in Burbank starting on 2015-04-20.
# 3. All four Glendale "account_groups" files are identical. Assuming the same for Burbank, as only October132015 is nonempty.
# 4. The "burbank_ws_medOctober072016.csv" and "glendale_ws_medOctober072016.csv" files are both missing thousands of sent reports.
#    The first stage is close to 100% using the older "burbank_ws_medOctober132015.csv" and "glendale_hwr_May222015.csv" files.
# 5. The "gwp_05222015/glendale_hwr_May222015.csv" data include only 1 WaterSmart report not in the other Glendale files, and
#  as this file includes much less detail (not even GPD), just excluding this one extra report observation.
# 6. Reasonable date range is GPD from 2013-04-01 through 2014-06-30; AMI from 2014-07-01 through 2016-09-30.

rm(list = ls()) # Clear workspace
wd = "/raid/data/WATER/data" ; setwd(wd) # Top-level directory containing WaterSmart data
pl = c('data.table', 'fasttime', 'zoo') ; lapply(pl, require, character.only=T) # Packages to load

#### (1) TREATMENT ARM ASSIGNMENT AND SENT REPORTS WITH RECOMMENDATIONS - EARLIEST VERSION OF EACH RECORD ONLY ####

# Experimental/Control section assignment per WaterSmart
arms.b <- fread(file.path(wd, 'bwp_10132015', 'burbank_account_groups_October132015.csv')) ; setkey(arms.b, residence_id)
arms.g <- fread(file.path(wd, 'gwp_05222015', 'glendale_account_groups_May222015.csv')) ; setkey(arms.g, residence_id)

# Burbank WaterSmart reports: unique to residence_id -by- send_timestamp
ws.b <- fread(file.path(wd, 'bwp_10132015', 'burbank_ws_medOctober132015.csv')) ; ws.b[, version := '20151013']
ws.b2 <- fread(file.path(wd, 'bwp_10072016', 'burbank_ws_medOctober072016.csv')) ; ws.b2[, version := '20161007']
ws.b3 <- fread(file.path(wd, 'bwp_11102016', 'burbank_ws_medNovember102016.csv')) ; ws.b3[, version := '20161110']
ws.b <- rbind(ws.b, ws.b2, ws.b3) ; rm(ws.b2, ws.b3)
setkey(ws.b, residence_id, send_timestamp, version)
ws.b <- ws.b[, .SD[1], by = .(residence_id, send_timestamp)] ; table(ws.b$version) # Keep only earliest version of each record

# Glendale WaterSmart reports: unique to residence_id -by- send_timestamp
ws.g <- fread(file.path(wd, 'gwp_11052015', 'glendale_ws_medNovember052015.csv')) ; ws.g[, version := '20151105']
ws.g$date <- NULL ; ws.g$first_waterscore <- NULL # Delete two variables present only in this version of the file
ws.g2 <- fread(file.path(wd, 'gwp_10072016', 'glendale_ws_medOctober072016.csv')) ; ws.g2[, version := '20161007']
ws.g3 <- fread(file.path(wd, 'gwp_11102016', 'glendale_ws_medNovember102016.csv')) ; ws.g3[, version := '20161110']
ws.g <- rbind(ws.g, ws.g2, ws.g3) ; rm(ws.g2, ws.g3)
setkey(ws.g, residence_id, send_timestamp, version)
ws.g <- ws.g[, .SD[1], by = .(residence_id, send_timestamp)] ; table(ws.g$version) # Keep only earliest version of each record

# Burbank "recommendations": unique to residence_id -by- mailing_id -by- content_id
recs.b <- fread(file.path(wd, 'bwp_10132015', 'burbank_recs_October132015.csv'))
recs.b2 <- fread(file.path(wd, 'bwp_10072016', 'burbank_recs_October072016.csv'))
recs.b3 <- fread(file.path(wd, 'bwp_11102016', 'burbank_recs_November102016.csv'))
recs.b <- unique(rbind(recs.b, recs.b2, recs.b3), by = NULL) ; rm(recs.b2, recs.b3)
setkey(recs.b, residence_id, channel, mailing_id) ; setkey(ws.b, residence_id, channel, mailing_id)
ws.b <- merge(ws.b, recs.b, all.x = T) ; rm(recs.b)

# Glendale "recommendations": unique to residence_id -by- mailing_id -by- content_id
# Note: 'gwp_05222015/glendale_recs_May222015.csv' does not add any records to those in the below included files
recs.g <- fread(file.path(wd, 'gwp_11052015', 'glendale_recs_November052015.csv'))
recs.g2 <- fread(file.path(wd, 'gwp_10072016', 'glendale_recs_October072016.csv'))
recs.g3<- fread(file.path(wd, 'gwp_11102016', 'glendale_recs_November102016.csv'))
recs.g <- unique(rbind(recs.g, recs.g2, recs.g3), by = NULL) ; rm(recs.g2, recs.g3)
setkey(recs.g, residence_id, channel, mailing_id) ; setkey(ws.g, residence_id, channel, mailing_id)
ws.g <- merge(ws.g, recs.g, all.x = T) ; rm(recs.g)

# Merge assigned treatment arms with WaterSmart reports + recommendations
reports.b <- merge(arms.b, ws.b, all = T) ; reports.g <- merge(arms.g, ws.g, all = T) ; rm(ws.b, ws.g, arms.b, arms.g)
reports.b[, city := 'Burbank'] ; reports.g[, city := 'Glendale']
reports <- rbind(reports.b, reports.g) ; rm(reports.b, reports.g)
reports[is.na(section), section := 'unassigned']
reports[city == 'Burbank' & section == 'experimental', arm := 'b1'][city == 'Burbank' & section == 'control', arm := 'b0']
reports[city == 'Glendale' & section == 'experimental', arm := 'g1'][city == 'Glendale' & section == 'control', arm := 'g0']
reports[city == 'Burbank' & section == 'unassigned', arm := 'bNA'][city == 'Glendale' & section == 'unassigned', arm := 'gNA']
setcolorder(reports, c('city', 'residence_id', 'arm', 'section', 'send_timestamp', 'version', 'label', 'waterscore', 'gpd', 
                       'channel', 'mailing_id', 'efficient_gpd_cutoff', 'display_median', 'action_gpd_cutoff', 'email_open',
                       'portallink_click', 'content_id', 'title'))
setkey(reports, city, residence_id, send_timestamp, content_id, title)

# Subset to treatment arms by city-by-residence_id (only) to merge to other data files in below code
arms <- unique(reports[, .(city, residence_id, arm)], by = NULL) ; setkey(arms, city, residence_id)

# Save to disk
fwrite(reports, file.path(wd, "BG_Arms_Reports_Recs.csv"))

#### (2) RESIDENTIAL BILLED WATER USE (GPD) - NEWEST VERSION OF EACH RECORD ONLY ####

# Burbank
bills.b <- fread(file.path(wd, 'bwp_10132015', 'burbank_usageOctober132015.csv')) ; bills.b[, version := '20151013']
bills.b2 <- fread(file.path(wd, 'bwp_10072016', 'burbank_usageOctober072016.csv')) ; bills.b2[, version := '20161007']
bills.b3 <- fread(file.path(wd, 'bwp_11102016', 'burbank_usageNovember102016.csv')) ; bills.b3[, version := '20161110']
bills.b <- rbind(bills.b, bills.b2, bills.b3) ; rm(bills.b2, bills.b3) ; bills.b[, readdate := as.Date(readdate)]
setkey(bills.b, residence_id, readdate, version)
bills.b <- bills.b[, .SD[.N], by = .(residence_id, readdate)] # Keep only newest version of each record

# Glendale
bills.g <- fread(file.path(wd, 'gwp_05222015', 'glendale_usageMay222015.csv')) ; bills.g[, version := '20150522']
bills.g2 <- fread(file.path(wd, 'gwp_10072016', 'glendale_usageOctober072016.csv')) ; bills.g2[, version := '20161007']
setnames(bills.g, 'id', 'residence_id') ; bills.g[, tier_number := NA]
bills.g <- rbind(bills.g, bills.g2) ; rm(bills.g2) ; bills.g[, readdate := as.Date(readdate)]
setkey(bills.g, residence_id, readdate, version)
bills.g <- bills.g[, .SD[.N], by = .(residence_id, readdate)] # Keep only newest version of each record

# Combine and merge to assigned treatment arms
bills.b[, city := 'Burbank'] ; bills.g[, city := 'Glendale'] ; bills <- rbind(bills.b, bills.g)
rm(bills.b, bills.g)
setkey(bills, city, residence_id) ; bills <- merge(bills, arms, all.x = T)
bills[is.na(arm) & city == 'Burbank', arm := 'bNA'][is.na(arm) & city == 'Glendale', arm := 'gNA']

setcolorder(bills, c('city', 'arm', 'residence_id', 'readdate', 'nthreading', 'nthreading_year', 'total_gallons',
                     'allocation', 'read_type', 'period_length', 'gpd', 'suspect_data_flag',
                     'suspect_data_reason', 'tier_number', 'version'))
setkey(bills, city, residence_id, readdate)

# Save to disk
fwrite(bills, file.path(wd, "BG_Arms_Billed_Usage.csv"))

#### (3) RESIDENTIAL PARTICIPATION IN VARIOUS WATER UTILITY PROGRAMS - ALL RECORDS COMPILED ACROSS DATA PULLS ####

# Burbank program lists
proglist.b <- fread(file.path(wd, 'bwp_10132015', 'burbank_utility_program_list_October132015.csv'))
proglist.b2 <- fread(file.path(wd, 'bwp_10072016', 'burbank_utility_program_list_October072016.csv'))
proglist.b3 <- fread(file.path(wd, 'bwp_11102016', 'burbank_utility_program_list_November102016.csv'))
proglist.b <- unique(rbind(proglist.b, proglist.b2, proglist.b3), by = NULL) ; rm(proglist.b2, proglist.b3)
setnames(proglist.b, 'id', 'utility_program_id') ; setkey(proglist.b, utility_program_id)

# Glendale program lists: note it's just the one, high-efficiency toilets
proglist.g <- fread(file.path(wd, 'gwp_11052015', 'glendale_utility_program_list_November052015.csv'))
proglist.g2 <- fread(file.path(wd, 'gwp_10072016', 'glendale_utility_program_list_October072016.csv'))
proglist.g3 <- fread(file.path(wd, 'gwp_11102016', 'glendale_utility_program_list_November102016.csv'))
proglist.g <- unique(rbind(proglist.g, proglist.g2, proglist.g3), by = NULL) ; rm(proglist.g2, proglist.g3)
setnames(proglist.g, 'id', 'utility_program_id') ; setkey(proglist.g, utility_program_id)

# Burbank program utilization
progpart.b <- fread(file.path(wd, 'bwp_10132015', 'burbank_utility_program_part_October132015.csv'))
progpart.b2 <- fread(file.path(wd, 'bwp_10072016', 'burbank_utility_program_part_October072016.csv'))
progpart.b3 <- fread(file.path(wd, 'bwp_11102016', 'burbank_utility_program_part_November102016.csv'))
progpart.b <- unique(rbind(progpart.b, progpart.b2, progpart.b3), by = NULL) ; rm(progpart.b2, progpart.b3)
setkey(progpart.b, utility_program_id) ; progpart.b <- merge(progpart.b, proglist.b) ; rm(proglist.b)

# Glendale program utilization - note glendale_utility_program_part_November052015 is empty.
progpart.g <- fread(file.path(wd, 'gwp_10072016', 'glendale_utility_program_part_October072016.csv'))
progpart.g2 <- fread(file.path(wd, 'gwp_11102016', 'glendale_utility_program_part_November102016.csv'))
progpart.g <- unique(rbind(progpart.g, progpart.g2), by = NULL) ; rm(progpart.g2)
setkey(progpart.g, utility_program_id) ; progpart.g <- merge(progpart.g, proglist.g) ; rm(proglist.g)

# Combine and merge to assigned treatment arms
progpart.b[, city := 'Burbank'] ; progpart.g[, city := 'Glendale'] ; progpart <- rbind(progpart.b, progpart.g)
rm(progpart.b, progpart.g)
setkey(progpart, city, residence_id) ; progpart <- merge(progpart, arms, all.x = T)
progpart[is.na(arm) & city == 'Burbank', arm := 'bNA'][is.na(arm) & city == 'Glendale', arm := 'gNA']
setcolorder(progpart, c('city', 'arm', 'residence_id', 'completion_date', 'utility_program_id', 'program_code', 
                        'program_description', 'program_type', 'notes'))
setkey(progpart, city, residence_id, completion_date, utility_program_id)

# Save to disk
fwrite(progpart, file.path(wd, "BG_Arms_Utility_Programs.csv"))

#### (4) RESIDENTIAL LEAK ALERTS SENT BY WATERSMART - EARLIEST VERSION OF EACH RECORD ONLY ####

# Burbank leak alerts
leaks.b <-  fread(file.path(wd, 'bwp_10072016', 'burbank_leak_alerts_October072016.csv')) ; leaks.b[, version := '20161007']
leaks.b2 <- fread(file.path(wd, 'bwp_11102016', 'burbank_leak_alerts_November102016.csv')) ; leaks.b2[, version := '20161110']
leaks.b <- unique(rbind(leaks.b, leaks.b2), by = NULL) ; rm(leaks.b2)
setkey(leaks.b, residence_id, start_interval_datetime, end_interval_datetime, version)
leaks.b <- leaks.b[, .SD[1], by = .(residence_id, start_interval_datetime, end_interval_datetime)]# Keep only earliest version

# Glendale leak alerts
leaks.g <-  fread(file.path(wd, 'gwp_05222015', 'glendale_leak_alerts_May222015.csv')) ; leaks.g[, version := '20150522']
leaks.g$response <- NULL
leaks.g2 <- fread(file.path(wd, 'gwp_10072016', 'glendale_leak_alerts_October072016.csv')) ; leaks.g2[, version := '20161007']
leaks.g3 <- fread(file.path(wd, 'gwp_11102016', 'glendale_leak_alerts_November102016.csv')) ; leaks.g3[, version := '20161110']
leaks.g <- unique(rbind(leaks.g, leaks.g2, leaks.g3), by = NULL) ; rm(leaks.g2, leaks.g3)
setkey(leaks.g, residence_id, start_interval_datetime, end_interval_datetime, version)
leaks.g <- leaks.g[, .SD[1], by = .(residence_id, start_interval_datetime, end_interval_datetime)] # Keep only earliest version

# Combine and merge to assigned treatment arms
leaks.b[, city := 'Burbank'] ; leaks.g[, city := 'Glendale'] ; leaks <- rbind(leaks.b, leaks.g) ; rm(leaks.g, leaks.b)
setkey(leaks, city, residence_id) ; leaks <- merge(leaks, arms, all.x = T)
leaks[is.na(arm) & city == 'Burbank', arm := 'bNA'][is.na(arm) & city == 'Glendale', arm := 'gNA']
setcolorder(leaks, c('city', 'arm', 'residence_id', 'version', 'start_interval_datetime', 'end_interval_datetime', 'leak_length',
                     'leak_rate', 'status', 'mailing_recipient_id', 'senddate', 'action', 'email_viewed', 'avg_gpd_lastyr'))
setkey(leaks, city, residence_id, start_interval_datetime, end_interval_datetime)

# Save to disk
fwrite(leaks, file.path(wd, "BG_Arms_Leak_Alerts.csv"))

#### (5) RESIDENTIAL/HOUSEHOLD CHARACTERISTICS SUCH AS LOT SIZE - ALL RECORDS COMPILED ACROSS DATA PULLS ####

# Burbank residence files
residence.b <-  fread(file.path(wd, 'bwp_10132015', 'burbank_residence_October132015.csv')) ; residence.b[, version := '20151013']
residence.b2 <- fread(file.path(wd, 'bwp_10072016', 'burbank_residence_October072016.csv')) ; residence.b2[, version := '20161007']
residence.b3 <- fread(file.path(wd, 'bwp_11102016', 'burbank_residence_November102016.csv')) ; residence.b3[, version := '20161110']
residence.b[, fee_code := NA]
residence.b <- unique(rbind(residence.b, residence.b2, residence.b3), by = NULL) ; rm(residence.b2, residence.b3)
setnames(residence.b, 'id', 'residence_id') ; setkey(residence.b, residence_id, version)

# Glendale residence files
residence.g <-  fread(file.path(wd, 'gwp_05222015', 'glendale_residence_May222015.csv')) ; residence.g[, version := '20150522']
residence.g2 <- fread(file.path(wd, 'gwp_11052015', 'glendale_residence_November052015.csv')) ; residence.g2[, version := '20151105']
residence.g3 <- fread(file.path(wd, 'gwp_10072016', 'glendale_residence_October072016.csv')) ; residence.g3[, version := '20161007']
residence.g4 <- fread(file.path(wd, 'gwp_11102016', 'glendale_residence_November102016.csv')) ; residence.g4[, version := '20161110']
residence.g[, fee_code := NA] ; residence.g2[, fee_code := NA]
setnames(residence.g, 'Lot Size', 'Lot Size (SqFt)') ; setnames(residence.g, 'Irrigable Area', 'Irrigable Area (SqFt)')
setnames(residence.g2, c('id', 'Num Occupants', 'Occupants Source', 'Home Size (SqFt)', 'Num Floors', 'Lot Size (SqFt)',
                         'Irrigable Area (SqFt)', 'Irrigable Area Source', 'Year Home Built', 'Num Bedrooms', 'Num Bathrooms',
                         'Residence Type', 'Has Logged In', 'version', 'fee_code'))
residence.g <- unique(rbind(residence.g, residence.g2, residence.g3, residence.g4), by = NULL)
rm(residence.g2, residence.g3, residence.g4)
setnames(residence.g, 'id', 'residence_id') ; setkey(residence.g, residence_id, version)

# Combine, and merge to assigned treatment arms
residence.b[, city := 'Burbank'] ; residence.g[, city := 'Glendale']
residence <- rbind(residence.b, residence.g) ; rm(residence.g, residence.b)
setkey(residence, city, residence_id) ; residence <- merge(residence, arms, all.x = T)
residence[is.na(arm) & city == 'Burbank', arm := 'bNA'][is.na(arm) & city == 'Glendale', arm := 'gNA']
setcolorder(residence, c('city', 'arm', 'residence_id', 'version', 'fee_code', 'Num Occupants', 'Occupants Source', 'Home Size (SqFt)', 
                         'Num Floors', 'Lot Size (SqFt)', 'Irrigable Area (SqFt)', 'Irrigable Area Source', 'Year Home Built', 
                         'Num Bedrooms', 'Num Bathrooms', 'Residence Type', 'Has Logged In'))
setkey(residence, city, residence_id, version)

# Save to disk
fwrite(residence, file.path(wd, "BG_Arms_Residence_Details.csv"))

#### (6) HOURLY RESIDENTIAL WATER CONSUMPTION - NEWEST VERSION OF EACH RECORD ONLY ####

## Burbank

# Burbank: 2015-10-13 files
burbank.20151013 <- lapply(1:12, function(m) {
  fread(file.path(wd, 'bwp_10132015', paste0('burbank_usage_interval_month', m, '_October142015.csv')))
})

# Burbank: 2016-10-07 files
burbank.20161007 <- lapply(1:12, function(m) {
  fread(file.path(wd, 'bwp_10072016', paste0('burbank_usage_interval_month', m, '_October072016.csv')))
})

# Combine the sets of Burbank files and keep unique
usage.b <- rbindlist(list(rbindlist(burbank.20151013), rbindlist(burbank.20161007)))
rm(burbank.20151013, burbank.20161007) ; gc()
setkey(usage.b, residence_id, read_datetime, gallons)
usage.b <- usage.b[, .SD[.N], by = .(residence_id, read_datetime)] ; gc() # Keep only newest version of each record

# Save to disk
fwrite(usage.b, file.path("Burbank_Hourly_Use.csv"))
rm(usage.b) ; gc()

## Glendale

# Glendale: 2015-05-22 files (note: these are different from the rest in format)
glendale.20150522 <- lapply(1:6, function(m) {
  f <- fread(file.path(wd, 'gwp_05222015', paste0('glendale_ui', m, '.txt')))
  f <- f[, .(residence_id, read_datetime, gallons)]
})

# Glendale: 2015-11-05 files
glendale.20151105 <- lapply(1:12, function(m) {
  fread(file.path(wd, 'gwp_11052015', paste0('glendale_usage_interval_month', m, '_November052015.csv')))
})

# Glendale: 2016-10-07 files
glendale.20161007 <- lapply(1:12, function(m) {
  fread(file.path(wd, 'gwp_10072016', paste0('glendale_usage_interval_month', m, '_October072016.csv')))
})

# Combine the sets of Glendale files and keep unique
usage.g <- unique(rbindlist(list(rbindlist(glendale.20150522), rbindlist(glendale.20151105), rbindlist(glendale.20161007))), by = NULL)
rm(glendale.20150522, glendale.20151105, glendale.20161007) ; gc()
setkey(usage.g, residence_id, read_datetime, gallons)
usage.g <- usage.g[, .SD[.N], by = .(residence_id, read_datetime)] ; gc() # Keep only newest version of each record

# Save to disk
fwrite(usage.g, file.path(wd, "Glendale_Hourly_Use.csv"))
rm(usage.g) ; gc()

#### (7) DAILY RESIDENTIAL WATER CONSUMPTION: TOTAL GPD, TOTAL DAILY, AND DAILY [TOTAL, BEFORE-9AM, AFTER-6PM] - NEWEST VERSION ONLY ####

## Billed usage data: use billed usage to interpolate gallons per date by residence -- dropping flagged suspect data

bills <- fread('BG_Arms_Billed_Usage.csv') ; bills[, readdate := as.Date(readdate)]

gpdcalc <- copy(bills[suspect_data_flag == 0 & !is.na(period_length), .(arm, residence_id, readdate, gpd)])
setkey(gpdcalc, arm, residence_id, readdate) ; setnames(gpdcalc, 'gpd', 'midgpd')
gpdcalc[, prevdate := shift(readdate), by = .(arm, residence_id)][, middate := prevdate + floor(difftime(readdate, prevdate) / 2)]
gpdcalc <- gpdcalc[!is.na(middate), .(arm, residence_id, date = middate, middate, midgpd)]
setkey(gpdcalc, arm, residence_id, date)

armresid <- gpdcalc[, .(mindate = min(date), maxdate = max(date), one = 1L), by = .(arm, residence_id)] ; setkey(armresid, one)
daterange <- data.table(date = seq(min(bills$readdate), max(bills$readdate), by = 'days'), one = 1L) ; setkey(daterange, one)
paneldate <- merge(armresid, daterange, allow.cartesian = T) ; rm(armresid, daterange)
paneldate <- paneldate[date >= mindate & date <= maxdate, .(arm, residence_id, date)] ; setkey(paneldate, arm, residence_id, date)
gc()

panelcalc <- merge(paneldate, gpdcalc, all.x = T) ; rm(paneldate, gpdcalc)
panelcalc[, prevmiddate := na.locf(middate), by = .(arm, residence_id)]
panelcalc[, prevmidgpd := na.locf(midgpd), by = .(arm, residence_id)]
panelcalc[, middate := na.locf(middate, fromLast = T), by = .(arm, residence_id)]
panelcalc[, midgpd := na.locf(midgpd, fromLast = T), by = .(arm, residence_id)]

# Linear interpolation between midpoints of billing cycle:
panelcalc[, difftime0 := as.numeric(difftime(date, prevmiddate, units = 'days'))]
panelcalc[, difftime1 := as.numeric(difftime(middate, date, units = 'days'))]
panelcalc[, tottime := difftime0 + difftime1]
panelcalc[tottime == 0, gpd := midgpd]
panelcalc[tottime != 0, gpd := difftime1/tottime * prevmidgpd + difftime0/tottime * midgpd]

# Can verify that interpolation worked by
test <- panelcalc[gpd > midgpd + 0.0000001 & gpd > prevmidgpd + 0.0000001] # Should have no obs.
length(test$gpd) ; rm(test)
# View(panelcalc[residence_id == 4]) # As one example, to see interpolation

# For daily panel
gpdpanel <- panelcalc[, .(arm, residence_id, date, gpd)] ; rm(panelcalc) ; setkey(gpdpanel, arm, residence_id, date)

## Hourly data

# Burbank: Convert hourly residential usage to daily total and total pre-9AM and total post-6PM
hourly.b <- fread(file.path(wd, 'Burbank_Hourly_Use.csv')) # Includes only newest version of each record
hourly.b[, date := substr(read_datetime, 1, 10)][, hod := as.integer(substr(read_datetime, 12, 13))]
setkey(hourly.b, residence_id, date, hod)
hourly.b[, b9gals := as.integer(0)][hod <= 8, b9gals := gallons]
hourly.b[, a6gals := as.integer(0)][hod >= 18, a6gals := gallons]
daily.b <- hourly.b[, .(gallons = sum(gallons), b9gals = sum(b9gals), a6gals = sum(a6gals)), by = .(residence_id, date)]
daily.b[, date := as.Date(date)]
rm(hourly.b) ; gc()

# Glendale: Convert hourly residential usage to daily total and total pre-9AM and total post-6PM
hourly.g <- fread(file.path(wd, 'Glendale_Hourly_Use.csv')) # Includes only newest version of each record
hourly.g[, date := substr(read_datetime, 1, 10)][, hod := as.integer(substr(read_datetime, 12, 13))]
setkey(hourly.g, residence_id, date, hod)
hourly.g[, b9gals := as.integer(0)][hod <= 8, b9gals := gallons]
hourly.g[, a6gals := as.integer(0)][hod >= 18, a6gals := gallons]
daily.g <- hourly.g[, .(gallons = sum(gallons), b9gals = sum(b9gals), a6gals = sum(a6gals)), by = .(residence_id, date)]
daily.g[, date := as.Date(date)]
rm(hourly.g) ; gc()

# Merge to assigned treatment arms and combine Burbank and Glendale
barms <- arms[city == 'Burbank', .(arm, residence_id)] ; setkey(barms, residence_id) ; setkey(daily.b, residence_id)
garms <- arms[city == 'Glendale', .(arm, residence_id)] ; setkey(garms, residence_id) ; setkey(daily.g, residence_id)
daily.b <- merge(daily.b, barms, all.x = T) ; daily.b[is.na(arm), arm := 'bNA'] ; rm(barms)
daily.g <- merge(daily.g, garms, all.x = T) ; daily.g[is.na(arm), arm := 'gNA'] ; rm(garms)
daily <- rbindlist(list(daily.b, daily.g)) ; rm(daily.b, daily.g)

## Merge/append interpolated and daily-from-hourly data

setkey(daily, arm, residence_id, date) ;  setkey(gpdpanel, arm, residence_id, date)
dailycon <- merge(gpdpanel, daily, all = T) ; rm(daily, gpdpanel)
setcolorder(dailycon, c('arm', 'residence_id', 'date', 'gpd', 'gallons', 'b9gals' , 'a6gals'))

# Check - should be no obs.
test <- dailycon[is.na(gpd) & is.na(gallons)] ; length(test$date) ; rm(test)

# Daily gallons: using GPD from 2013-04-01 through 2014-06-30; AMI from 2014-07-01 through 2016-09-30.
dailycon[date < as.Date('2014-07-01'), dailygal := gpd]
dailycon[date >= as.Date('2014-07-01'), dailygal := as.numeric(gallons)]

# Save to disk
fwrite(dailycon, file.path(wd, "BG_Arms_Daily_Consumption.csv"))

#### CHECKS ####

table(arms$arm, useNA = 'always') # * Check that none are now NA *
table(bills$arm, useNA = 'always') # * Check that none are now NA *
table(dailycon$arm, useNA = 'always') # * Check that none are now NA *
table(leaks$arm, useNA = 'always') # * Check that none are now NA *
table(progpart$arm, useNA = 'always') # * Check that none are now NA *
table(reports$arm, useNA = 'always') # * Check that none are now NA *
table(residence$arm, useNA = 'always') # * Check that none are now NA *

## END
